/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.inlong.sort.iceberg.catalog.hybris;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.qcloud.dlc.metastore.DLCDataCatalogMetastoreClient;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The only changed point is HiveClientPool, from
 * {@link HiveMetaStoreClient} to {@link DLCDataCatalogMetastoreClient}
 */
public class DlcWrappedHybrisCatalog extends BaseMetastoreCatalog implements SupportsNamespaces, Configurable {
    public static final String LIST_ALL_TABLES = "list-all-tables";
    public static final String LIST_ALL_TABLES_DEFAULT = "false";

    // dlc auth
    public static final String DLC_ENDPOINT = "qcloud.dlc.endpoint";


    private static final Logger LOG = LoggerFactory.getLogger(DlcWrappedHybrisCatalog.class);

    private String name;
    private Configuration conf;
    private FileIO fileIO;
    private ClientPool<IMetaStoreClient, TException> clients;
    private boolean listAllTables = false;

    public DlcWrappedHybrisCatalog() {
    }

    @Override
    public void initialize(String inputName, Map<String, String> properties) {
        this.name = inputName;
        if (conf == null) {
            LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
            this.conf = new Configuration();
        }

        if (properties.containsKey(CatalogProperties.URI)) {
            this.conf.set(DLC_ENDPOINT, properties.get(CatalogProperties.URI));
        }

        if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
            this.conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
                    properties.get(CatalogProperties.WAREHOUSE_LOCATION));
        }

        // dlc auth
        properties.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith("qcloud.dlc"))
                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));

        // lakefs auth
        properties.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith("fs"))
                .forEach(entry -> this.conf.set(entry.getKey(), entry.getValue()));

        this.listAllTables = Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));

        String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
        this.fileIO = fileIOImpl == null
                ? new HadoopFileIO(conf) : CatalogUtil.loadFileIO(fileIOImpl, properties, conf);

        this.clients = new CachedClientPool(conf, properties);
    }

    @Override
    public List<TableIdentifier> listTables(Namespace namespace) {
        Preconditions.checkArgument(isValidateNamespace(namespace),
                "Missing database in namespace: %s", namespace);
        String database = namespace.level(0);

        try {
            List<String> tableNames = clients.run(client -> client.getAllTables(database));
            List<TableIdentifier> tableIdentifiers;

            if (listAllTables) {
                tableIdentifiers = tableNames.stream()
                        .map(t -> TableIdentifier.of(namespace, t))
                        .collect(Collectors.toList());
            } else {
                List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
                tableIdentifiers = tableObjects.stream()
                        .filter(table -> table.getParameters() != null
                                && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
                                    .equalsIgnoreCase(
                                            table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
                        .map(table -> TableIdentifier.of(namespace, table.getTableName()))
                        .collect(Collectors.toList());
            }

            LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
            return tableIdentifiers;

        } catch (UnknownDBException e) {
            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);

        } catch (TException e) {
            throw new RuntimeException("Failed to list all tables under namespace " + namespace, e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted in call to listTables", e);
        }
    }

    @Override
    public String name() {
        return name;
    }

    @Override
    public boolean dropTable(TableIdentifier identifier, boolean purge) {
        if (!isValidIdentifier(identifier)) {
            return false;
        }

        String database = identifier.namespace().level(0);

        TableOperations ops = newTableOps(identifier);
        TableMetadata lastMetadata;
        if (purge && ops.current() != null) {
            lastMetadata = ops.current();
        } else {
            lastMetadata = null;
        }

        try {
            clients.run(client -> {
                client.dropTable(database, identifier.name(),
                        false /* do not delete data */,
                        false /* throw NoSuchObjectException if the table doesn't exist */);
                return null;
            });

            if (purge && lastMetadata != null) {
                CatalogUtil.dropTableData(ops.io(), lastMetadata);
            }

            LOG.info("Dropped table: {}", identifier);
            return true;

        } catch (NoSuchTableException | NoSuchObjectException e) {
            LOG.info("Skipping drop, table does not exist: {}", identifier, e);
            return false;

        } catch (TException e) {
            throw new RuntimeException("Failed to drop " + identifier, e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted in call to dropTable", e);
        }
    }

    @Override
    public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
        if (!isValidIdentifier(from)) {
            throw new NoSuchTableException("Invalid identifier: %s", from);
        }

        TableIdentifier to = removeCatalogName(originalTo);
        Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);

        String toDatabase = to.namespace().level(0);
        String fromDatabase = from.namespace().level(0);
        String fromName = from.name();

        try {
            Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
            HiveTableOperations.validateTableIsIceberg(table, fullTableName(name, from));

            table.setDbName(toDatabase);
            table.setTableName(to.name());

            clients.run(client -> {
                client.alter_table(fromDatabase, fromName, table);
                return null;
            });

            LOG.info("Renamed table from {}, to {}", from, to);

        } catch (NoSuchObjectException e) {
            throw new NoSuchTableException("Table does not exist: %s", from);

        } catch (AlreadyExistsException e) {
            throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", to);

        } catch (TException e) {
            throw new RuntimeException("Failed to rename " + from + " to " + to, e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted in call to rename", e);
        }
    }

    @Override
    public org.apache.iceberg.Table registerTable(TableIdentifier identifier, String metadataFileLocation) {
        Preconditions.checkArgument(isValidIdentifier(identifier), "Invalid identifier: %s", identifier);

        // Throw an exception if this table already exists in the catalog.
        if (tableExists(identifier)) {
            throw new org.apache.iceberg.exceptions.AlreadyExistsException("Table already exists: %s", identifier);
        }

        TableOperations ops = newTableOps(identifier);
        InputFile metadataFile = fileIO.newInputFile(metadataFileLocation);
        TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);
        ops.commit(null, metadata);

        return new BaseTable(ops, identifier.toString());
    }

    @Override
    public void createNamespace(Namespace namespace, Map<String, String> meta) {
        Preconditions.checkArgument(
                !namespace.isEmpty(),
                "Cannot create namespace with invalid name: %s", namespace);
        Preconditions.checkArgument(isValidateNamespace(namespace),
                "Cannot support multi part namespace in Hive Metastore: %s", namespace);

        try {
            clients.run(client -> {
                client.createDatabase(convertToDatabase(namespace, meta));
                return null;
            });

            LOG.info("Created namespace: {}", namespace);

        } catch (AlreadyExistsException e) {
            throw new org.apache.iceberg.exceptions.AlreadyExistsException(e, "Namespace '%s' already exists!",
                    namespace);

        } catch (TException e) {
            throw new RuntimeException("Failed to create namespace " + namespace + " in Hive Metastore", e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(
                    "Interrupted in call to createDatabase(name) " + namespace + " in Hive Metastore", e);
        }
    }

    @Override
    public List<Namespace> listNamespaces(Namespace namespace) {
        if (!isValidateNamespace(namespace) && !namespace.isEmpty()) {
            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
        }
        if (!namespace.isEmpty()) {
            return ImmutableList.of();
        }
        try {
            List<Namespace> namespaces = clients.run(IMetaStoreClient::getAllDatabases)
                    .stream()
                    .map(Namespace::of)
                    .collect(Collectors.toList());

            LOG.debug("Listing namespace {} returned tables: {}", namespace, namespaces);
            return namespaces;

        } catch (TException e) {
            throw new RuntimeException("Failed to list all namespace: " + namespace + " in Hive Metastore",  e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(
                    "Interrupted in call to getAllDatabases() " + namespace + " in Hive Metastore", e);
        }
    }

    @Override
    public boolean dropNamespace(Namespace namespace) {
        if (!isValidateNamespace(namespace)) {
            return false;
        }

        try {
            clients.run(client -> {
                client.dropDatabase(namespace.level(0),
                        false /* deleteData */,
                        false /* ignoreUnknownDb */,
                        false /* cascade */);
                return null;
            });

            LOG.info("Dropped namespace: {}", namespace);
            return true;

        } catch (InvalidOperationException e) {
            throw new NamespaceNotEmptyException(e, "Namespace %s is not empty. One or more tables exist.", namespace);

        } catch (NoSuchObjectException e) {
            return false;

        } catch (TException e) {
            throw new RuntimeException("Failed to drop namespace " + namespace + " in Hive Metastore", e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(
                    "Interrupted in call to drop dropDatabase(name) " + namespace + " in Hive Metastore", e);
        }
    }

    @Override
    public boolean setProperties(Namespace namespace,  Map<String, String> properties) {
        Map<String, String> parameter = Maps.newHashMap();

        parameter.putAll(loadNamespaceMetadata(namespace));
        parameter.putAll(properties);
        Database database = convertToDatabase(namespace, parameter);

        alterHiveDataBase(namespace, database);
        LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace);

        // Always successful, otherwise exception is thrown
        return true;
    }

    @Override
    public boolean removeProperties(Namespace namespace,  Set<String> properties) {
        Map<String, String> parameter = Maps.newHashMap();

        parameter.putAll(loadNamespaceMetadata(namespace));
        properties.forEach(key -> parameter.put(key, null));
        Database database = convertToDatabase(namespace, parameter);

        alterHiveDataBase(namespace, database);
        LOG.debug("Successfully removed properties {} from {}", properties, namespace);

        // Always successful, otherwise exception is thrown
        return true;
    }

    private void alterHiveDataBase(Namespace namespace,  Database database) {
        try {
            clients.run(client -> {
                client.alterDatabase(namespace.level(0), database);
                return null;
            });

        } catch (NoSuchObjectException | UnknownDBException e) {
            throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);

        } catch (TException e) {
            throw new RuntimeException(
                    "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(
                    "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
        }
    }

    @Override
    public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
        if (!isValidateNamespace(namespace)) {
            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
        }

        try {
            Database database = clients.run(client -> client.getDatabase(namespace.level(0)));
            Map<String, String> metadata = convertToMetadata(database);
            LOG.debug("Loaded metadata for namespace {} found {}", namespace, metadata.keySet());
            return metadata;

        } catch (NoSuchObjectException | UnknownDBException e) {
            throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace);

        } catch (TException e) {
            throw new RuntimeException(
                    "Failed to list namespace under namespace: " + namespace + " in Hive Metastore", e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(
                    "Interrupted in call to getDatabase(name) " + namespace + " in Hive Metastore", e);
        }
    }

    @Override
    protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
        return tableIdentifier.namespace().levels().length == 1;
    }

    private TableIdentifier removeCatalogName(TableIdentifier to) {
        if (isValidIdentifier(to)) {
            return to;
        }

        // check if the identifier includes the catalog name and remove it
        if (to.namespace().levels().length == 2 && name().equalsIgnoreCase(to.namespace().level(0))) {
            return TableIdentifier.of(Namespace.of(to.namespace().level(1)), to.name());
        }

        // return the original unmodified
        return to;
    }

    private boolean isValidateNamespace(Namespace namespace) {
        return namespace.levels().length == 1;
    }

    @Override
    public TableOperations newTableOps(TableIdentifier tableIdentifier) {
        String dbName = tableIdentifier.namespace().level(0);
        String tableName = tableIdentifier.name();
        return new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName);
    }

    @Override
    protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
        // This is a little edgy since we basically duplicate the HMS location generation logic.
        // Sadly I do not see a good way around this if we want to keep the order of events, like:
        // - Create meta files
        // - Create the metadata in HMS, and this way committing the changes

        // Create a new location based on the namespace / database if it is set on database level
        try {
            Database databaseData = clients.run(client -> client.getDatabase(tableIdentifier.namespace().levels()[0]));
            if (databaseData.getLocationUri() != null) {
                // If the database location is set use it as a base.
                return String.format("%s/%s", databaseData.getLocationUri(), tableIdentifier.name());
            }

        } catch (TException e) {
            throw new RuntimeException(String.format("Metastore operation failed for %s", tableIdentifier), e);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted during commit", e);
        }

        // Otherwise stick to the {WAREHOUSE_DIR}/{DB_NAME}.db/{TABLE_NAME} path
        String warehouseLocation = getWarehouseLocation();
        return String.format(
                "%s/%s.db/%s",
                warehouseLocation,
                tableIdentifier.namespace().levels()[0],
                tableIdentifier.name());
    }

    private String getWarehouseLocation() {
        String warehouseLocation = conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
        Preconditions.checkNotNull(warehouseLocation,
                "Warehouse location is not set: hive.metastore.warehouse.dir=null");
        return warehouseLocation;
    }

    private Map<String, String> convertToMetadata(Database database) {

        Map<String, String> meta = Maps.newHashMap();

        meta.putAll(database.getParameters());
        meta.put("location", database.getLocationUri());
        if (database.getDescription() != null) {
            meta.put("comment", database.getDescription());
        }

        return meta;
    }

    Database convertToDatabase(Namespace namespace, Map<String, String> meta) {
        if (!isValidateNamespace(namespace)) {
            throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
        }

        Database database = new Database();
        Map<String, String> parameter = Maps.newHashMap();

        database.setName(namespace.level(0));
        database.setLocationUri(new Path(getWarehouseLocation(), namespace.level(0)).toString() + ".db");

        meta.forEach((key, value) -> {
            if (key.equals("comment")) {
                database.setDescription(value);
            } else if (key.equals("location")) {
                database.setLocationUri(value);
            } else {
                if (value != null) {
                    parameter.put(key, value);
                }
            }
        });
        database.setParameters(parameter);

        return database;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("name", name)
                .add("uri", this.conf == null ? "" : this.conf.get(HiveConf.ConfVars.METASTOREURIS.varname))
                .toString();
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = new Configuration(conf);
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    @VisibleForTesting
    void setListAllTables(boolean listAllTables) {
        this.listAllTables = listAllTables;
    }
}
